Solutions/BitSight/Data Connectors/BitSightDataConnector/AlertsGraphStatisticsDetails/bitsight_statistics.py (473 lines of code) (raw):
"""Module for fetching BitSight Alert, Graph and Statistics details and posting to Sentinel."""
import time
from datetime import datetime
import json
import hashlib
from ..SharedCode.logger import applogger
from ..SharedCode.bitsight_client import BitSight
from ..SharedCode.bitsight_exception import BitSightException
from ..SharedCode.get_logs_data import get_logs_data
from ..SharedCode.utils import CheckpointManager
from ..SharedCode.consts import (
API_TOKEN,
ALERTS_PAGE_SIZE,
DILIGENCE_STATISCTICS_TABLE,
INDUSTRIAL_STATISCTICS_TABLE,
OBSERVATION_STATISCTICS_TABLE,
DILIGENCE_HISTORICAL_STATISTICS_TABLE,
GRAPH_DATA_TABLE,
ALERTS_DATA_TABLE,
COMPANIES,
ENDPOINTS,
ALERT_GRAPH_STATISTICS_FUNC_NAME
)
class BitSightStatistics(BitSight):
"""Class for fetching BitSight Alert, Graph and Statistics details and posting it to Sentinel."""
def __init__(self, start_time) -> None:
"""Initialize BitSightStatistics object.
Args:
start_time (float): The start time for data fetching.
"""
super().__init__()
self.start_time = start_time
self.check_env_var = self.check_environment_var_exist(
[
{"api_token": API_TOKEN},
{"diligence_statistics_table_name": DILIGENCE_STATISCTICS_TABLE},
{"industrial_statistics_table_name": INDUSTRIAL_STATISCTICS_TABLE},
{"observation_statistics_table_name": OBSERVATION_STATISCTICS_TABLE},
{
"diligence_historical_statistics_table_name": DILIGENCE_HISTORICAL_STATISTICS_TABLE
},
{"graph_data_table_name": GRAPH_DATA_TABLE},
{"alerts_data_table_name": ALERTS_DATA_TABLE},
{"companies_list": COMPANIES},
]
)
self.checkpoint_obj = CheckpointManager()
self.company_state = self.checkpoint_obj.get_state("statisctics_company")
self.diligence_statistics_state = self.checkpoint_obj.get_state(
"diligence_statistics"
)
self.industries_statistics_state = self.checkpoint_obj.get_state(
"industries_statistics"
)
self.observations_statistics_state = self.checkpoint_obj.get_state(
"observations_statistics"
)
self.diligence_historical_statistics_state = self.checkpoint_obj.get_state(
"diligence_historical_statistics"
)
self.graph_state = self.checkpoint_obj.get_state("graph_data")
self.alerts_state = self.checkpoint_obj.get_state("alerts_data")
self.date_format = "%Y-%m-%d"
self.diligence_statistics_path = ENDPOINTS["diligence_statistics_url"]
self.industrial_statistics_path = ENDPOINTS["industries_statistics_url"]
self.observation_statistics_path = ENDPOINTS["observations_statistics_url"]
self.diligence_historical_statistics_path = ENDPOINTS[
"diligence_historical-statistics_url"
]
self.graph_data_path = ENDPOINTS["graph_data_url"]
self.alerts_details_path = ENDPOINTS["alerts_url"]
self.companies_str = COMPANIES
self.generate_auth_token()
def get_risk_vector_data(
self, endpoint, endpoint_path, company_name, company_guid, state, table_name
):
"""Fetch risk vector data for a specific endpoint and post it to Sentinel.
Args:
endpoint (str): Name of the endpoint.
endpoint_path (str): Path of the endpoint.
company_name (str): Name of the company.
company_guid (str): GUID of the company.
state (str): State to store the checkpoint data.
table_name (str): Name of the table.
Raises:
BitSightException: Raises exception if any error occurs.
"""
try:
data_to_post = []
risk_vector_data = []
checkpoint_key = "{}".format(company_guid)
checkpoint_data = self.checkpoint_obj.get_last_data(state, table_name=table_name)
last_data = self.checkpoint_obj.get_endpoint_last_data(
checkpoint_data, endpoint, checkpoint_key
)
url = self.base_url + endpoint_path
res_list = self.get_bitsight_data(url)
if not res_list:
return
if res_list.get("risk_vectors"):
risk_data = res_list.get("risk_vectors")
risk_vectors = risk_data.keys()
for risk_vector in risk_vectors:
risk_vector_detail = risk_data.get(risk_vector)
risk_vector_detail["risk_vector"] = risk_vector
risk_vector_detail["Company_name"] = company_name
data = json.dumps(risk_vector_detail, sort_keys=True)
result = hashlib.sha512(data.encode("utf-8"))
result_hash = result.hexdigest()
if last_data:
if result_hash not in last_data:
risk_vector_data.append(risk_vector_detail)
data_to_post.append(result_hash)
else:
risk_vector_data.append(risk_vector_detail)
data_to_post.append(result_hash)
self.send_data_to_sentinel(
risk_vector_data, table_name, company_name, endpoint
)
self.checkpoint_obj.save_checkpoint(
state, checkpoint_data, endpoint, "{}_{}".format(table_name, "Checkpoint"), checkpoint_key, data_to_post
)
except BitSightException:
raise BitSightException()
except Exception as err:
applogger.exception(
"{} {} Error getting risk vector: {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, err
)
)
raise BitSightException()
def get_results_hash(self, data, p_data, company_name):
"""Generate a hash value for BitSight statistics results.
Args:
data (dict): The data to be included in the hash.
p_data (dict): Additional data, such as date and grade, to be included in the hash.
company_name (str): The name of the company associated with the data.
Raises:
BitSightException: If an error occurs during the hashing process.
Returns:
str: The SHA-512 hash value generated for the combined data.
"""
try:
data["date"] = p_data.get("date")
data["grade"] = p_data.get("grade")
data["company_name"] = company_name
data = json.dumps(data, sort_keys=True)
result = hashlib.sha512(data.encode())
result_hash = result.hexdigest()
return result_hash
except Exception as error:
applogger.exception(
"{} {} Error in hashing: {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, error
)
)
raise BitSightException()
def get_diligence_historical_statistics_details(self, company_name, company_guid):
"""Fetch and process diligence historical statistics for a specific company.
Args:
company_name (str): The name of the company.
company_guid (str): The GUID of the company.
Raises:
BitSightException: If an error occurs during the process.
"""
try:
checkpoint_data_to_post = []
post_data = []
checkpoint_key = "{}".format(company_guid)
checkpoint_data = self.checkpoint_obj.get_last_data(
self.diligence_historical_statistics_state, table_name=DILIGENCE_HISTORICAL_STATISTICS_TABLE
)
last_data = self.checkpoint_obj.get_endpoint_last_data(
checkpoint_data, "diligence_historical-statistics", company_guid
)
url = self.base_url + self.diligence_historical_statistics_path.format(
company_guid
)
response = self.get_bitsight_data(url)
if not response:
return
if response.get("results"):
results_data = response.get("results")
for data in results_data:
if data.get("counts"):
count_data = data.get("counts")
for count_category in count_data:
result_hash = self.get_results_hash(
count_category, data, company_name
)
if (
last_data and result_hash not in last_data
) or not last_data:
count_category["grade"] = data["grade"]
count_category["date"] = data["date"]
post_data.append(count_category)
checkpoint_data_to_post.append(result_hash)
else:
checkpoint_data_to_post.append(result_hash)
self.send_data_to_sentinel(
post_data,
DILIGENCE_HISTORICAL_STATISTICS_TABLE,
company_name,
"diligence historical statistics",
)
self.checkpoint_obj.save_checkpoint(
self.diligence_historical_statistics_state,
checkpoint_data,
"diligence_historical-statistics",
"{}_{}".format(DILIGENCE_HISTORICAL_STATISTICS_TABLE, "Checkpoint"),
checkpoint_key,
checkpoint_data_to_post,
)
except BitSightException:
raise BitSightException()
except Exception as err:
applogger.exception(
"{} {} Error in getting diligence historical statistics: {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, err
)
)
raise BitSightException()
def get_graph_data(self, company_name, company_guid):
"""Fetch and process graph data for a specific company.
Args:
company_name (str): The name of the company.
company_guid (str): The GUID of the company.
Raises:
BitSightException: If an error occurs during the process.
"""
try:
data_to_post = []
post_data = []
last_rating = None
current_rating = None
rating_diff = None
last_date = None
checkpoint_key = "{}".format(company_guid)
checkpoint_data = self.checkpoint_obj.get_last_data(self.graph_state, table_name=GRAPH_DATA_TABLE)
last_data = self.checkpoint_obj.get_endpoint_last_data(
checkpoint_data, "graph_data", company_guid
)
if last_data:
last_date = datetime.strptime(last_data[0], self.date_format)
last_rating = last_data[1]
url = self.base_url + self.graph_data_path.format(company_guid)
response = self.get_bitsight_data(url)
if not response:
return
if response.get("ratings"):
rating_data = response["ratings"]
sorted_ratings = sorted(rating_data, key=lambda i: i["x"])
count = 0
# get rating of every date from response
for rating in sorted_ratings:
rating["Rating_Date"] = rating.pop("x")
rating["Rating"] = rating.pop("y")
rating["Company_name"] = company_name
rating["Rating_differance"] = None
date_in_object = datetime.strptime(
rating["Rating_Date"], self.date_format
)
if last_data:
if date_in_object.date() > last_date.date():
current_rating = rating["Rating"]
rating_diff = int(current_rating) - int(last_rating)
rating["Rating_differance"] = rating_diff
last_rating = current_rating
post_data.append(rating)
data_to_post = [rating["Rating_Date"], rating["Rating"]]
else:
data_to_post = [str(last_date.date()), rating["Rating"]]
else:
if count == 0:
last_rating = rating["Rating"]
rating["Rating_differance"] = rating_diff
else:
current_rating = rating["Rating"]
rating_diff = int(current_rating) - int(last_rating)
rating["Rating_differance"] = rating_diff
last_rating = current_rating
post_data.append(rating)
data_to_post = [rating["Rating_Date"], rating["Rating"]]
count += 1
self.send_data_to_sentinel(
post_data, GRAPH_DATA_TABLE, company_name, "graph"
)
self.checkpoint_obj.save_checkpoint(
self.graph_state,
checkpoint_data,
"graph_data",
"{}_{}".format(GRAPH_DATA_TABLE, "Checkpoint"),
checkpoint_key,
data_to_post,
)
except BitSightException:
raise BitSightException()
except KeyError as err:
applogger.error(
"{} {} KeyError: {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, err
)
)
raise BitSightException()
except Exception as err:
applogger.exception(
"{} {} Error in getting graph data: {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, err
)
)
raise BitSightException()
def get_alerts_details(self, company_name, company_guid):
"""Fetch and process alerts details for a specific company.
Args:
company_name (str): The name of the company.
company_guid (str): The GUID of the company.
Raises:
BitSightException: If an error occurs during the process.
"""
try:
data_to_post = None
checkpoint_key = "{}".format(company_guid)
checkpoint_data = self.checkpoint_obj.get_last_data(self.alerts_state, table_name=ALERTS_DATA_TABLE)
last_date = self.checkpoint_obj.get_endpoint_last_data(
checkpoint_data, "alerts_data", company_guid
)
if last_date:
last_date = datetime.strptime(last_date, self.date_format).date()
query_parameter = {"limit": ALERTS_PAGE_SIZE, "sort": "alert_date"}
url = self.base_url + self.alerts_details_path.format(company_guid)
response = self.get_bitsight_data(url, query_parameter)
if not response:
return
next_link = response.get("links").get("next")
alerts_data = []
c_data = {}
query_parameter["offset"] = 0
while next_link:
query_parameter["offset"] += query_parameter.get("limit")
c_data["next1"] = self.get_bitsight_data(url, query_parameter)
next_link = c_data["next1"].get("links").get("next")
response.get("results").extend(c_data["next1"].get("results"))
is_alert_exist = False
for alert in response["results"]:
current_date = alert["alert_date"]
if last_date:
if alert["company_name"].lower() == company_name.lower():
is_alert_exist = True
current_date = datetime.strptime(
current_date, self.date_format
).date()
if current_date > last_date:
data_to_post = alert["alert_date"]
alerts_data.append(alert)
else:
data_to_post = str(last_date)
else:
if alert["company_name"].lower() == company_name.lower():
is_alert_exist = True
data_to_post = alert["alert_date"]
alerts_data.append(alert)
if alerts_data:
self.send_data_to_sentinel(
alerts_data, ALERTS_DATA_TABLE, company_name, "alerts_data"
)
else:
if is_alert_exist:
applogger.info(
"{} {} The data of alerts for {} company is already exist.".format(
self.logs_starts_with,
ALERT_GRAPH_STATISTICS_FUNC_NAME,
company_name,
)
)
else:
applogger.info(
"{} {} No alert data found for {} company from BitSight.".format(
self.logs_starts_with,
ALERT_GRAPH_STATISTICS_FUNC_NAME,
company_name,
)
)
self.checkpoint_obj.save_checkpoint(
self.alerts_state,
checkpoint_data,
"alerts_data",
"{}_{}".format(ALERTS_DATA_TABLE, "Checkpoint"),
checkpoint_key,
data_to_post,
)
except BitSightException:
raise BitSightException()
except Exception as err:
applogger.exception("{} {} Error: {}".format(self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, err))
raise BitSightException()
def get_all_copmanies_alerts_graph_statisctics_details(
self, logs_data, company_names
):
"""Fetch alerts, graph, and statistics details for all companies.
Args:
logs_data (list): List of log data.
company_names (list): List of company names.
"""
fetching_index = self.get_last_data_index(
company_names, self.checkpoint_obj, self.company_state, table_name="{}_{}".format(ALERTS_DATA_TABLE, "Statistics")
)
for company_index in range(fetching_index + 1, len(logs_data)):
company_name = logs_data[company_index].get("name_s")
if int(time.time()) >= self.start_time + 540:
applogger.info(
"{} {} 9:00 mins executed hence breaking. In next iteration, start fetching from {}.".format(
self.logs_starts_with,
ALERT_GRAPH_STATISTICS_FUNC_NAME,
company_name,
)
)
break
company_guid = logs_data[company_index].get("guid_g")
self.get_company_alerts_graph_statisctics_details(
company_name, company_guid
)
self.checkpoint_obj.save_checkpoint(
self.company_state,
company_name,
"statisctics_company",
"{}_{}".format(ALERTS_DATA_TABLE, "Statistics_Company_Checkpoint"),
company_name_flag=True,
)
def get_specified_companies_alerts_graph_statisctics_details(
self, logs_data, company_names
):
"""Fetch alerts, graph, and statistics details for specified companies.
Args:
logs_data (list): List of log data.
company_names (list): List of company names.
"""
applogger.debug(
"{} {} Fetching data for specified company names.".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME
)
)
companies_to_get = self.get_specified_companies_list(
company_names, self.companies_str
)
company_names = list(map(str.lower, company_names))
for company in companies_to_get:
if int(time.time()) >= self.start_time + 540:
applogger.info(
"{} {} 9:00 mins executed hence breaking. In next iteration, start fetching after {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, company
)
)
break
index = company_names.index(company)
company_name = logs_data[index].get("name_s")
company_guid = logs_data[index].get("guid_g")
self.get_company_alerts_graph_statisctics_details(
company_name, company_guid
)
def get_company_alerts_graph_statisctics_details(self, company_name, company_guid):
"""Fetch alerts, graph, and statistics details for a specific company.
Args:
company_name (str): Name of the company.
company_guid (str): GUID of the company.
"""
self.get_risk_vector_data(
"diligence_statistics",
self.diligence_statistics_path.format(company_guid),
company_name,
company_guid,
self.diligence_statistics_state,
DILIGENCE_STATISCTICS_TABLE,
)
self.get_risk_vector_data(
"industries_statistics",
self.industrial_statistics_path.format(company_guid),
company_name,
company_guid,
self.industries_statistics_state,
INDUSTRIAL_STATISCTICS_TABLE,
)
self.get_risk_vector_data(
"observations_statistics",
self.observation_statistics_path.format(company_guid),
company_name,
company_guid,
self.observations_statistics_state,
OBSERVATION_STATISCTICS_TABLE,
)
self.get_diligence_historical_statistics_details(company_name, company_guid)
self.get_graph_data(company_name, company_guid)
self.get_alerts_details(company_name, company_guid)
def get_bitsight_data_into_sentinel(self):
"""Fetch data from BitSight and post it to Sentinel."""
try:
if not self.check_env_var:
raise BitSightException(
"{} {} Some Environment variables are not set hence exiting the app.".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME
)
)
applogger.info(
"{} {} Fetching companies from companies table.".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME
)
)
logs_data, flag = get_logs_data()
if not flag:
applogger.info(
"{} {} Companies are not available yet.".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME
)
)
return
applogger.info(
"{} {} Fetched companies from companies table.".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME
)
)
logs_data = sorted(logs_data, key=lambda x: x["name_s"])
company_names = [data["name_s"] for data in logs_data]
if (self.companies_str.strip()).lower() == "all":
self.get_all_copmanies_alerts_graph_statisctics_details(
logs_data, company_names
)
else:
self.get_specified_companies_alerts_graph_statisctics_details(
logs_data, company_names
)
except BitSightException:
raise BitSightException()
except Exception as err:
applogger.exception(
"{} {} Error: {}".format(
self.logs_starts_with, ALERT_GRAPH_STATISTICS_FUNC_NAME, err
)
)
raise BitSightException()